AWS Data Wranglerを使ってpandas -> AthenaとAthena -> pandasをやってみた
どうも、DA 事業本部の大澤です。
AWS Data Wrangler を使って、”pandas DataFrame のデータを Athena のテーブルへ”、”Athena でのクエリ実行結果を pandas DataFrame へ”というのを試してみました。かなり便利だったので、どんな感じでできるかをご紹介します。
AWS Data Wrangler
2019年9月、Github上にAWS Data Wrangler(以下、Data Wrangler)が公開されました。Data Wranglerは、各種AWSサービスからデータを取得して、コーディングをサポートしてくれるPythonのモジュールです。
現在、Python を用いて、Amazon Athena(以下、Athena)や Amazon Redshift(以下、Redshift)からデータを取得して、ETL 処理を行う際、PyAthena や boto3、Pandas などを利用して行うことが多いかと思います。その際、本来実施したい ETL のコーディングまでに、接続設定を書いたり、各種コーディングが必要でした。Data Wragler を利用することで、Athena や Amazon S3(以下、S3)上の CSV から Pandas を利用するのが、数行で実施できたり、PySpark から Redshift に連携できるなど、お客様側は ETL の処理の記述内容に集中することができます。 本モジュールはインスタンスに対して pip でインストールできることに加え、Lambda Layer としての利用や Glue 上で egg ファイルをアップロードして利用することができます。
AWS Data Wrangler を使って、簡単に ETL 処理を実現する | Amazon Web Services ブログより
やってみる
インストール
Aws Data Wrangler を pip でインストールします。
pip install awswrangler
ライブラリの読み込みと準備
使用するライブラリを読み込み、各パラメータの設定等を行います。パラメータは必要に応じて変更してください。
import awswrangler import numpy as np import pandas as pd from os import path database = 'test_data_wrangler' # 検証用に使用するGlue/AthenaのDB名 bucket = '' # 使用するS3のバケット名 prefix = 'data_wrangler' # データの配置場所 s3_prefix = f's3://{bucket}/{prefix}' # AWS Data Wrangler の各処理はこのセッションを通じて行います session = awswrangler.Session()
検証用のデータベース作成
Glueのクライアントを使って、検証用にデータベースを作成します。
glue = session.boto3_session.client('glue') glue.create_database(DatabaseInput={'Name':database})
データ作成
乱数を生成し、1000行4列のデータを作成します。1-3列目は[0, 1)の実数、4列目は[0,10)の整数にします。
df = pd.DataFrame(np.hstack((np.random.rand(1000, 3), np.random.randint(0, 10, (1000, 1)))) , columns=['a', 'b', 'c', 'd']) df
pandas DataFrame -> Athena
AWS Data Wranglerを使って先ほど作成したDataFrameのデータをAthenaのテーブルへ挿れます。
自動的にParquet化し、指定した場所にデータをアップロードしてくれます。また、テーブルが存在しない場合は自動的に作成されます。テーブルがすでに存在し、レコードもすでに入ってる場合の挙動は引数のmode
によってコントロールできます。
そのほかの引数についてはドキュメントをご覧ください。
table='test' session.pandas.to_parquet( dataframe=df, database=database, table=table, path=path.join(s3_prefix, table), # データの保存場所 partition_cols='d', mode='overwrite_partitions', # 'append', 'overwrite', 'overwrite_partitions' preserve_index=False, compression='gzip' # None, 'snappy', 'gzip', 'lzo' )
実行後、アップロードされたParquetファイルのパス一覧が返されます。列dでパーティション化されていることが分かります。
Athena -> pandas DataFrame
次は先ほど作成したテーブルに対してクエリを実行し、その結果を取得してみます。
query = ''' select * from test ''' athena_df = session.pandas.read_sql_athena( sql=query, database=database ) athena_df
検証用のデータベース削除
検証が完了したので、最初に作成したデータベースを削除します。
glue.delete_database(Name=database)
さいごに
AWS Data Wrangler を使って、”pandas DataFrame-> Athena”と”Athena -> pandas DataFrame”をやってみた様子をご紹介しました。AWS Data Wrangler には今回ご紹介した内容以外にも多くのことが可能です。 うまく活用することで ETL 処理がかなり楽になりそうです!